package com.sf.sgs.mark.exception.service.impl;

import com.sf.sgs.mark.exception.common.utils.KafkaUtils;
import com.sf.sgs.mark.exception.domain.KafkaConfig;
import com.sf.sgs.mark.exception.domain.KafkaData;
import com.sf.sgs.mark.exception.mapper.KafkaDataMapper;
import com.sf.sgs.mark.exception.service.IKafkaDataService;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by 01204808 on 2017/7/16.
 */
@Component
public class KafkaDataService implements IKafkaDataService {

    @Autowired
    private KafkaDataMapper kafkaDataMapper;
    @Autowired
    private KafkaConfigService kafkaConfigService;

    @Override
    public boolean addKafkaData(KafkaData kafkaData) {
        kafkaDataMapper.addKafkaData(kafkaData);
        KafkaConfig kafkaConfig = kafkaConfigService.getKafkaConfig(kafkaData.getTopicName());
        KafkaUtils.sendMessage(kafkaConfig.getKafkaUrl(), kafkaConfig.getTopicName(), kafkaConfig.getTopicToken(), kafkaConfig.getClusterName(), kafkaData.getMessage());
        return true;
    }

    @Override
    public List<KafkaData> queryKafkaData(String waybillNo, String topicName, int startRecord, int pageSize) {
        return kafkaDataMapper.findKafkaData(waybillNo, topicName, startRecord, pageSize);
    }

    @Override
    public int queryKafkaDataTotal(String waybillNo, String topicName) {
        return kafkaDataMapper.getKafkaDataTotal(waybillNo, topicName);
    }
}
